//===----------------------------------------------------------------------===//
//
//                         BusTub
//
// aggregation_executor.cpp
//
// Identification: src/execution/aggregation_executor.cpp
//
// Copyright (c) 2015-2021, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//
#include <memory>
#include <vector>

#include "execution/executors/aggregation_executor.h"

namespace bustub {

AggregationExecutor::AggregationExecutor(ExecutorContext *exec_ctx, const AggregationPlanNode *plan,
                                         std::unique_ptr<AbstractExecutor> &&child_executor)
    : AbstractExecutor(exec_ctx),
      plan_(plan),
      child_executor_(std::move(child_executor)),
      aht_(plan_->GetAggregates(), plan_->GetAggregateTypes()),
      aht_iterator_(aht_.Begin()) {}

void AggregationExecutor::Init() {
  // LOG_DEBUG("Init");
  child_executor_->Init();
  aht_.Clear();
  Tuple tuple;
  RID rid;

  // child_executor_也有可能是聚合算子，调用Next之前需要先调用Init
  while (child_executor_->Next(&tuple, &rid)) {
    // LOG_DEBUG("child_executor_::Next");
    aht_.InsertCombine(MakeAggregateKey(&tuple), MakeAggregateValue(&tuple));
  }
  // 空表且没有group by
  if (aht_.Begin() == aht_.End() && plan_->GetGroupBys().empty()) {
    // LOG_DEBUG("AggregationExecutor::Empty table");
    aht_.ht_.insert({MakeAggregateKey(&tuple), aht_.GenerateInitialAggregateValue()});
  }
  aht_iterator_ = aht_.Begin();
  // LOG_DEBUG("AggregationExecutor::Init done");
  is_init_ = true;
}

auto AggregationExecutor::Next(Tuple *tuple, RID *rid) -> bool {
  if (!is_init_) {
    Init();
  }
  std::vector<Value> values;
  std::vector<Value> keys;

  if (aht_iterator_ != aht_.End()) {
    // 将key , val 组合成tuple , 传出, 判断是否可以传出
    // group by字段在前，聚合字段在后
    // LOG_DEBUG("Next1");
    values = aht_iterator_.Key().group_bys_;
    values.insert(values.end(), aht_iterator_.Val().aggregates_.begin(), aht_iterator_.Val().aggregates_.end());
    // LOG_DEBUG("Next2, schema=%s", GetOutputSchema().ToString().c_str());
    auto agg_output_schema =
        AggregationPlanNode::InferAggSchema(plan_->GetGroupBys(), plan_->GetAggregates(), plan_->GetAggregateTypes());
    *tuple = Tuple(values, &agg_output_schema);
    // *tuple = Tuple(values, &GetOutputSchema());

    ++aht_iterator_;
    // LOG_DEBUG("AggregationExecutor::Next true");
    return true;
  }
  // LOG_DEBUG("AggregationExecutor::Next false");
  return false;
}

auto AggregationExecutor::GetChildExecutor() const -> const AbstractExecutor * { return child_executor_.get(); }

}  // namespace bustub
